/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.druid.indexing.common.task;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.impl.AggregateProjectionSpec;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskConfigBuilder;
import org.apache.druid.indexing.common.task.CompactionTask.Builder;
import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.NativeCompactionRunner.PartitionConfigurationManager;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.input.DruidInputSource;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.FloatMinAggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.firstlast.first.FloatFirstAggregatorFactory;
import org.apache.druid.query.aggregation.firstlast.last.DoubleLastAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.SimpleQueryableIndex;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.BaseColumn;
import org.apache.druid.segment.column.BaseColumnHolder;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnIndexSupplier;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.CompressionFactory.LongEncodingStrategy;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.data.ListIndexed;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.BatchIOConfig;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.apache.druid.segment.loading.NoopSegmentCacheManager;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.realtime.ChatHandlerProvider;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import org.apache.druid.segment.transform.CompactionTransformSpec;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@RunWith(MockitoJUnitRunner.class)
public class CompactionTaskTest
{
  private static final long SEGMENT_SIZE_BYTES = 100;
  private static final int NUM_ROWS_PER_SEGMENT = 10;
  private static final String DATA_SOURCE = "dataSource";
  private static final String TIMESTAMP_COLUMN = "timestamp";
  private static final String MIXED_TYPE_COLUMN = "string_to_double";
  private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-07-01");
  private static final List<Interval> SEGMENT_INTERVALS = ImmutableList.of(
      Intervals.of("2017-01-01/2017-02-01"),
      Intervals.of("2017-02-01/2017-03-01"),
      Intervals.of("2017-03-01/2017-04-01"),
      Intervals.of("2017-04-01/2017-05-01"),
      Intervals.of("2017-05-01/2017-06-01"),
      Intervals.of("2017-06-01/2017-07-01"),
      // overlapping intervals
      Intervals.of("2017-06-01/2017-06-02"),
      Intervals.of("2017-06-15/2017-06-16"),
      Intervals.of("2017-06-30/2017-07-01")
  );
  private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = new HashMap<>();
  private static final CompactionTask.CompactionTuningConfig TUNING_CONFIG = createTuningConfig();

  private static final TestUtils TEST_UTILS = new TestUtils();
  private static final Map<DataSegment, File> SEGMENT_MAP = new HashMap<>();
  private static final CoordinatorClient COORDINATOR_CLIENT = new TestCoordinatorClient(SEGMENT_MAP);
  private static final ObjectMapper OBJECT_MAPPER = setupInjectablesInObjectMapper(new DefaultObjectMapper());
  private static final String CONFLICTING_SEGMENT_GRANULARITY_FORMAT =
      "Conflicting segment granularities found %s(segmentGranularity) and %s(granularitySpec.segmentGranularity).\n"
      + "Remove `segmentGranularity` and set the `granularitySpec.segmentGranularity` to the expected granularity";

  private static final ServiceMetricEvent.Builder METRIC_BUILDER = new ServiceMetricEvent.Builder();

  private static Map<String, DimensionSchema> DIMENSIONS;
  private static List<AggregatorFactory> AGGREGATORS;
  private static List<DataSegment> SEGMENTS;

  private TaskToolbox toolbox;
  private SegmentCacheManagerFactory segmentCacheManagerFactory;

  @BeforeClass
  public static void setupClass()
  {
    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-01-01/2017-02-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN));
    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-02-01/2017-03-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN));
    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-03-01/2017-04-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN));
    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-04-01/2017-05-01"), new StringDimensionSchema(MIXED_TYPE_COLUMN));
    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-05-01/2017-06-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-01/2017-07-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));

    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-01/2017-06-02"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-15/2017-06-16"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));
    MIXED_TYPE_COLUMN_MAP.put(Intervals.of("2017-06-30/2017-07-01"), new DoubleDimensionSchema(MIXED_TYPE_COLUMN));

    DIMENSIONS = new HashMap<>();
    AGGREGATORS = new ArrayList<>();

    DIMENSIONS.put(ColumnHolder.TIME_COLUMN_NAME, new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME));
    DIMENSIONS.put(TIMESTAMP_COLUMN, new LongDimensionSchema(TIMESTAMP_COLUMN));
    int numUmbrellaIntervals = 6;
    for (int i = 0; i < numUmbrellaIntervals; i++) {
      final StringDimensionSchema schema = new StringDimensionSchema(
          "string_dim_" + i,
          null,
          null
      );
      DIMENSIONS.put(schema.getName(), schema);
    }
    for (int i = 0; i < numUmbrellaIntervals; i++) {
      final LongDimensionSchema schema = new LongDimensionSchema("long_dim_" + i);
      DIMENSIONS.put(schema.getName(), schema);
    }
    for (int i = 0; i < numUmbrellaIntervals; i++) {
      final FloatDimensionSchema schema = new FloatDimensionSchema("float_dim_" + i);
      DIMENSIONS.put(schema.getName(), schema);
    }
    for (int i = 0; i < numUmbrellaIntervals; i++) {
      final DoubleDimensionSchema schema = new DoubleDimensionSchema("double_dim_" + i);
      DIMENSIONS.put(schema.getName(), schema);
    }

    AGGREGATORS.add(new CountAggregatorFactory("agg_0"));
    AGGREGATORS.add(new LongSumAggregatorFactory("agg_1", "long_dim_1"));
    AGGREGATORS.add(new LongMaxAggregatorFactory("agg_2", "long_dim_2"));
    AGGREGATORS.add(new FloatFirstAggregatorFactory("agg_3", "float_dim_3", null));
    AGGREGATORS.add(new DoubleLastAggregatorFactory("agg_4", "double_dim_4", null));

    for (int i = 0; i < SEGMENT_INTERVALS.size(); i++) {
      SEGMENT_MAP.put(
          new DataSegment(
              DATA_SOURCE,
              SEGMENT_INTERVALS.get(i),
              "version_" + i,
              ImmutableMap.of(),
              findDimensions(i, SEGMENT_INTERVALS.get(i)),
              AGGREGATORS.stream().map(AggregatorFactory::getName).collect(Collectors.toList()),
              new NumberedShardSpec(0, 1),
              0,
              SEGMENT_SIZE_BYTES
          ),
          new File("file_" + i)
      );
    }
    SEGMENTS = new ArrayList<>(SEGMENT_MAP.keySet());
  }

  private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMapper)
  {
    final GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
    objectMapper.setAnnotationIntrospectors(
        new AnnotationIntrospectorPair(
            guiceIntrospector,
            objectMapper.getSerializationConfig().getAnnotationIntrospector()
        ),
        new AnnotationIntrospectorPair(
            guiceIntrospector,
            objectMapper.getDeserializationConfig().getAnnotationIntrospector()
        )
    );
    GuiceInjectableValues injectableValues = new GuiceInjectableValues(
        GuiceInjectors.makeStartupInjectorWithModules(
            ImmutableList.of(
                binder -> {
                  binder.bind(AuthorizerMapper.class).toInstance(AuthTestUtils.TEST_AUTHORIZER_MAPPER);
                  binder.bind(ChatHandlerProvider.class).toInstance(new NoopChatHandlerProvider());
                  binder.bind(RowIngestionMetersFactory.class).toInstance(TEST_UTILS.getRowIngestionMetersFactory());
                  binder.bind(CoordinatorClient.class).toInstance(COORDINATOR_CLIENT);
                  binder.bind(SegmentCacheManagerFactory.class)
                        .toInstance(new SegmentCacheManagerFactory(TestIndex.INDEX_IO, objectMapper));
                  binder.bind(AppenderatorsManager.class).toInstance(new TestAppenderatorsManager());
                  binder.bind(ExprMacroTable.class).toInstance(TestExprMacroTable.INSTANCE);
                }
            )
        )
    );
    objectMapper.setInjectableValues(injectableValues);
    objectMapper.registerModule(
        new SimpleModule().registerSubtypes(new NamedType(NumberedShardSpec.class, "NumberedShardSpec"))
    );
    objectMapper.registerModules(new IndexingServiceTuningConfigModule().getJacksonModules());
    return objectMapper;
  }

  private static List<String> findDimensions(int startIndex, Interval segmentInterval)
  {
    final List<String> dimensions = new ArrayList<>();
    dimensions.add(TIMESTAMP_COLUMN);
    for (int i = 0; i < 6; i++) {
      int postfix = i + startIndex;
      postfix = postfix % 6;
      dimensions.add("string_dim_" + postfix);
      dimensions.add("long_dim_" + postfix);
      dimensions.add("float_dim_" + postfix);
      dimensions.add("double_dim_" + postfix);
    }
    dimensions.add(MIXED_TYPE_COLUMN_MAP.get(segmentInterval).getName());
    return dimensions;
  }

  private static CompactionTask.CompactionTuningConfig createTuningConfig()
  {
    return TuningConfigBuilder.forCompactionTask()
        .withMaxRowsInMemory(500_000)
        .withMaxBytesInMemory(1_000_000L)
        .withIndexSpec(
            IndexSpec.builder()
                     .withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
                     .withDimensionCompression(CompressionStrategy.LZ4)
                     .withMetricCompression(CompressionStrategy.LZF)
                     .withLongEncoding(LongEncodingStrategy.LONGS)
                     .build()
        )
        .withForceGuaranteedRollup(true)
        .withReportParseExceptions(false)
        .withPushTimeout(5000L)
        .build();
  }

  @Rule
  public ExpectedException expectedException = ExpectedException.none();

  private StubServiceEmitter emitter;

  @Before
  public void setup()
  {
    final TestIndexIO testIndexIO = new TestIndexIO(OBJECT_MAPPER, SEGMENT_MAP);
    emitter = new StubServiceEmitter();
    toolbox = makeTaskToolbox(
        new TestTaskActionClient(new ArrayList<>(SEGMENT_MAP.keySet())),
        testIndexIO,
        SEGMENT_MAP
    );
    segmentCacheManagerFactory = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, OBJECT_MAPPER);
  }

  @Test
  public void testCreateCompactionTaskWithGranularitySpec()
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
    builder.tuningConfig(createTuningConfig());
    builder.segmentGranularity(Granularities.HOUR);
    final CompactionTask taskCreatedWithSegmentGranularity = builder.build();

    final Builder builder2 = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    builder2.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
    builder2.tuningConfig(createTuningConfig());
    builder2.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY, null));
    final CompactionTask taskCreatedWithGranularitySpec = builder2.build();
    Assert.assertEquals(
        taskCreatedWithGranularitySpec.getSegmentGranularity(),
        taskCreatedWithSegmentGranularity.getSegmentGranularity()
    );
  }

  @Test(expected = IAE.class)
  public void testCreateCompactionTaskWithConflictingGranularitySpecAndSegmentGranularityShouldThrowIAE()
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
    builder.tuningConfig(createTuningConfig());
    builder.segmentGranularity(Granularities.HOUR);
    builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.MINUTE, Granularities.DAY, null));
    try {
      builder.build();
    }
    catch (IAE iae) {
      Assert.assertEquals(
          StringUtils.format(
              CONFLICTING_SEGMENT_GRANULARITY_FORMAT,
              Granularities.HOUR,
              Granularities.MINUTE
          ),
          iae.getMessage()
      );
      throw iae;
    }
    Assert.fail("Should not have reached here!");
  }

  @Test
  public void testCreateCompactionTaskWithTransformSpec()
  {
    CompactionTransformSpec transformSpec =
        new CompactionTransformSpec(new SelectorDimFilter("dim1", "foo", null));
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
    builder.tuningConfig(createTuningConfig());
    builder.transformSpec(transformSpec);
    final CompactionTask taskCreatedWithTransformSpec = builder.build();
    Assert.assertEquals(
        transformSpec,
        taskCreatedWithTransformSpec.getTransformSpec()
    );
  }

  @Test
  public void testCreateCompactionTaskWithMetricsSpec()
  {
    AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{new CountAggregatorFactory("cnt")};
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
    builder.tuningConfig(createTuningConfig());
    builder.metricsSpec(aggregatorFactories);
    final CompactionTask taskCreatedWithTransformSpec = builder.build();
    Assert.assertArrayEquals(
        aggregatorFactories,
        taskCreatedWithTransformSpec.getMetricsSpec()
    );
  }

  @Test(expected = IAE.class)
  public void testCreateCompactionTaskWithNullSegmentGranularityInGranularitySpecAndSegmentGranularityShouldSucceed()
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
    builder.tuningConfig(createTuningConfig());
    builder.segmentGranularity(Granularities.HOUR);
    builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, Granularities.DAY, null));
    try {
      builder.build();
    }
    catch (IAE iae) {
      Assert.assertEquals(
          StringUtils.format(
              CONFLICTING_SEGMENT_GRANULARITY_FORMAT,
              Granularities.HOUR,
              null
          ),
          iae.getMessage()
      );
      throw iae;
    }
    Assert.fail("Should not have reached here!");
  }

  @Test
  public void testCreateCompactionTaskWithSameGranularitySpecAndSegmentGranularityShouldSucceed()
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
    builder.tuningConfig(createTuningConfig());
    builder.segmentGranularity(Granularities.HOUR);
    builder.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.HOUR, Granularities.DAY, null));
    final CompactionTask taskCreatedWithSegmentGranularity = builder.build();
    Assert.assertEquals(Granularities.HOUR, taskCreatedWithSegmentGranularity.getSegmentGranularity());
  }

  @Test
  public void testSerdeWithInterval() throws IOException
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    final CompactionTask task = builder
        .inputSpec(
            new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))
        )
        .tuningConfig(createTuningConfig())
        .context(ImmutableMap.of("testKey", "testContext"))
        .build();

    final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
    final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
    assertEquals(task, fromJson);
  }

  @Test
  public void testSerdeWithSegments() throws IOException
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    final CompactionTask task = builder
        .segments(SEGMENTS)
        .tuningConfig(createTuningConfig())
        .context(ImmutableMap.of("testKey", "testContext"))
        .build();

    final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
    final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
    assertEquals(task, fromJson);
  }

  @Test
  public void testSerdeWithDimensions() throws IOException
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );

    final CompactionTask task = builder
        .segments(SEGMENTS)
        .dimensionsSpec(
            new DimensionsSpec(
                ImmutableList.of(
                    new StringDimensionSchema("dim1"),
                    new StringDimensionSchema("dim2"),
                    new StringDimensionSchema("dim3")
                )
            )
        )
        .tuningConfig(createTuningConfig())
        .context(ImmutableMap.of("testKey", "testVal"))
        .build();

    final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
    final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
    assertEquals(task, fromJson);
  }

  @Test
  public void testSerdeWithProjections() throws IOException
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    final List<AggregateProjectionSpec> projections = List.of(
        AggregateProjectionSpec.builder("test")
                               .groupingColumns(
                                   new StringDimensionSchema("dim1"),
                                   new StringDimensionSchema("dim2"),
                                   new StringDimensionSchema("dim3")
                               )
                               .aggregators(
                                   new CountAggregatorFactory("count"),
                                   new LongSumAggregatorFactory("sum_long_dim_1", "long_dim_1")
                               )
                               .build()
    );

    final CompactionTask task = builder
        .segments(SEGMENTS)
        .projections(projections)
        .tuningConfig(createTuningConfig())
        .build();

    final byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(task);
    final CompactionTask fromJson = OBJECT_MAPPER.readValue(bytes, CompactionTask.class);
    Assert.assertEquals(projections, fromJson.getProjections());
    assertEquals(task, fromJson);
  }

  @Test
  public void testSerdeWithOldTuningConfigSuccessfullyDeserializeToNewOne() throws IOException
  {
    final OldCompactionTaskWithAnyTuningConfigType oldTask = new OldCompactionTaskWithAnyTuningConfigType(
        null,
        null,
        DATA_SOURCE,
        null,
        SEGMENTS,
        null,
        null,
        null,
        null,
        null,
        TuningConfigBuilder
            .forIndexTask()
            .withMaxRowsInMemory(500000)
            .withIndexSpec(
                IndexSpec.builder()
                         .withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
                         .withDimensionCompression(CompressionStrategy.LZ4)
                         .withMetricCompression(CompressionStrategy.LZF)
                         .withLongEncoding(LongEncodingStrategy.LONGS)
                         .build()
            )
            .withForceGuaranteedRollup(true)
            .withReportParseExceptions(false)
            .withPublishTimeout(5000L)
            .build(),
        null,
        toolbox.getJsonMapper(),
        AuthTestUtils.TEST_AUTHORIZER_MAPPER,
        toolbox.getChatHandlerProvider(),
        toolbox.getRowIngestionMetersFactory(),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory,
        toolbox.getAppenderatorsManager()
    );

    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );

    final CompactionTask expectedFromJson = builder
        .segments(SEGMENTS)
        .tuningConfig(CompactionTask.getTuningConfig(oldTask.getTuningConfig()))
        .build();

    final ObjectMapper mapper = new DefaultObjectMapper((DefaultObjectMapper) OBJECT_MAPPER);
    mapper.registerSubtypes(new NamedType(OldCompactionTaskWithAnyTuningConfigType.class, "compact"));
    final byte[] bytes = mapper.writeValueAsBytes(oldTask);
    final CompactionTask fromJson = mapper.readValue(bytes, CompactionTask.class);
    assertEquals(expectedFromJson, fromJson);
  }

  @Test
  public void testInputSourceResources()
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    final CompactionTask task = builder
        .inputSpec(
            new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS))
        )
        .tuningConfig(createTuningConfig())
        .context(ImmutableMap.of("testKey", "testContext"))
        .build();

    Assert.assertTrue(task.getInputSourceResources().isEmpty());
  }

  @Test
  public void testGetTuningConfigWithIndexTuningConfig()
  {
    IndexTuningConfig indexTuningConfig = TuningConfigBuilder
        .forIndexTask()
        .withMaxRowsInMemory(500000)
        .withMaxBytesInMemory(1000000L)
        .withIndexSpec(
            IndexSpec.builder()
                     .withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
                     .withDimensionCompression(CompressionStrategy.LZ4)
                     .withMetricCompression(CompressionStrategy.LZF)
                     .withLongEncoding(LongEncodingStrategy.LONGS)
                     .build()
        )
        .withForceGuaranteedRollup(true)
        .withReportParseExceptions(false)
        .build();

    CompactionTask.CompactionTuningConfig compactionTuningConfig = TuningConfigBuilder
        .forCompactionTask()
        .withMaxRowsInMemory(500000)
        .withMaxBytesInMemory(1000000L)
        .withIndexSpec(
            IndexSpec.builder()
                     .withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
                     .withDimensionCompression(CompressionStrategy.LZ4)
                     .withMetricCompression(CompressionStrategy.LZF)
                     .withLongEncoding(LongEncodingStrategy.LONGS)
                     .build()
        )
        .withForceGuaranteedRollup(true)
        .withReportParseExceptions(false)
        .build();

    Assert.assertEquals(compactionTuningConfig, CompactionTask.getTuningConfig(indexTuningConfig));

  }

  @Test
  public void testGetTuningConfigWithParallelIndexTuningConfig()
  {
    ParallelIndexTuningConfig parallelIndexTuningConfig = TuningConfigBuilder
        .forParallelIndexTask()
        .withMaxRowsInMemory(500000)
        .withMaxBytesInMemory(1000000L)
        .withIndexSpec(
            IndexSpec.builder()
                     .withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
                     .withDimensionCompression(CompressionStrategy.LZ4)
                     .withMetricCompression(CompressionStrategy.LZF)
                     .withLongEncoding(LongEncodingStrategy.LONGS)
                     .build()
        )
        .withForceGuaranteedRollup(true)
        .withReportParseExceptions(false)
        .withPushTimeout(5000L)
        .build();

    CompactionTask.CompactionTuningConfig compactionTuningConfig = TuningConfigBuilder
        .forCompactionTask()
        .withMaxRowsInMemory(500000)
        .withMaxBytesInMemory(1000000L)
        .withIndexSpec(
            IndexSpec.builder()
                     .withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
                     .withDimensionCompression(CompressionStrategy.LZ4)
                     .withMetricCompression(CompressionStrategy.LZF)
                     .withLongEncoding(LongEncodingStrategy.LONGS)
                     .build()
        )
        .withForceGuaranteedRollup(true)
        .withReportParseExceptions(false)
        .withPushTimeout(5000L)
        .build();

    Assert.assertEquals(compactionTuningConfig, CompactionTask.getTuningConfig(parallelIndexTuningConfig));
  }

  private static void assertEquals(CompactionTask expected, CompactionTask actual)
  {
    Assert.assertEquals(expected.getType(), actual.getType());
    Assert.assertEquals(expected.getDataSource(), actual.getDataSource());
    Assert.assertEquals(expected.getIoConfig(), actual.getIoConfig());
    Assert.assertEquals(expected.getDimensionsSpec(), actual.getDimensionsSpec());
    Assert.assertArrayEquals(expected.getMetricsSpec(), actual.getMetricsSpec());
    Assert.assertEquals(expected.getTuningConfig(), actual.getTuningConfig());
    Assert.assertEquals(expected.getContext(), actual.getContext());
  }

  @Test
  public void testSegmentProviderFindSegmentsWithEmptySegmentsThrowException()
  {
    final SegmentProvider provider = new SegmentProvider(
        "datasource",
        new CompactionIntervalSpec(Intervals.of("2021-01-01/P1D"), null)
    );

    expectedException.expect(IllegalStateException.class);
    expectedException.expectMessage(
        "No segments found for compaction. Please check that datasource name and interval are correct."
    );
    provider.checkSegments(LockGranularity.TIME_CHUNK, ImmutableList.of());
  }

  @Test
  public void testCreateIngestionSchema() throws IOException
  {
    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        null,
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );

    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();

    ingestionSpecs.sort(
        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
        )
    );
    Assert.assertEquals(6, ingestionSpecs.size());
    assertIngestionSchema(
        ingestionSpecs,
        expectedDimensionsSpec,
        AGGREGATORS.stream().map(AggregatorFactory::getCombiningFactory).collect(Collectors.toList()),
        SEGMENT_INTERVALS,
        Granularities.MONTH,
        Granularities.NONE,
        BatchIOConfig.DEFAULT_DROP_EXISTING
    );
  }

  @Test
  public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOException
  {
    final CompactionTask.CompactionTuningConfig tuningConfig = TuningConfigBuilder
        .forCompactionTask()
        .withTargetPartitionSize(100000)
        .withMaxRowsInMemory(500000)
        .withMaxBytesInMemory(1000000L)
        .withIndexSpec(
            IndexSpec.builder()
                     .withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
                     .withDimensionCompression(CompressionStrategy.LZ4)
                     .withMetricCompression(CompressionStrategy.LZF)
                     .withLongEncoding(LongEncodingStrategy.LONGS)
                     .build()
        )
        .withForceGuaranteedRollup(true)
        .withReportParseExceptions(false)
        .withMaxNumConcurrentSubTasks(10)
        .build();

    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        null,
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(tuningConfig),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );

    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();

    ingestionSpecs.sort(
        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
        )
    );
    Assert.assertEquals(6, ingestionSpecs.size());
    assertIngestionSchema(
        ingestionSpecs,
        expectedDimensionsSpec,
        AGGREGATORS.stream().map(AggregatorFactory::getCombiningFactory).collect(Collectors.toList()),
        SEGMENT_INTERVALS,
        tuningConfig,
        Granularities.MONTH,
        Granularities.NONE,
        BatchIOConfig.DEFAULT_DROP_EXISTING
    );
  }

  @Test
  public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException
  {
    final CompactionTask.CompactionTuningConfig tuningConfig = TuningConfigBuilder
        .forCompactionTask()
        .withMaxRowsInMemory(500000)
        .withMaxBytesInMemory(1000000L)
        .withMaxTotalRows(1000000L)
        .withIndexSpec(
            IndexSpec.builder()
                     .withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
                     .withDimensionCompression(CompressionStrategy.LZ4)
                     .withMetricCompression(CompressionStrategy.LZF)
                     .withLongEncoding(LongEncodingStrategy.LONGS)
                     .build()
        )
        .withForceGuaranteedRollup(false)
        .withReportParseExceptions(false)
        .withPushTimeout(5000L)
        .build();

    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        null,
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(tuningConfig),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );

    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();

    ingestionSpecs.sort(
        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
        )
    );
    Assert.assertEquals(6, ingestionSpecs.size());
    assertIngestionSchema(
        ingestionSpecs,
        expectedDimensionsSpec,
        AGGREGATORS.stream().map(AggregatorFactory::getCombiningFactory).collect(Collectors.toList()),
        SEGMENT_INTERVALS,
        tuningConfig,
        Granularities.MONTH,
        Granularities.NONE,
        BatchIOConfig.DEFAULT_DROP_EXISTING
    );
  }

  @Test
  public void testCreateIngestionSchemaWithNumShards() throws IOException
  {
    final CompactionTask.CompactionTuningConfig tuningConfig = TuningConfigBuilder
        .forCompactionTask()
        .withMaxRowsInMemory(500000)
        .withMaxBytesInMemory(1000000L)
        .withPartitionsSpec(new HashedPartitionsSpec(null, 3, null))
        .withIndexSpec(
            IndexSpec.builder()
                     .withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
                     .withDimensionCompression(CompressionStrategy.LZ4)
                     .withMetricCompression(CompressionStrategy.LZF)
                     .withLongEncoding(LongEncodingStrategy.LONGS)
                     .build()
        )
        .withForceGuaranteedRollup(true)
        .withReportParseExceptions(false)
        .withPushTimeout(5000L)
        .withMaxNumConcurrentSubTasks(10)
        .build();

    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        null,
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(tuningConfig),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );
    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();

    ingestionSpecs.sort(
        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
        )
    );
    Assert.assertEquals(6, ingestionSpecs.size());
    assertIngestionSchema(
        ingestionSpecs,
        expectedDimensionsSpec,
        AGGREGATORS.stream().map(AggregatorFactory::getCombiningFactory).collect(Collectors.toList()),
        SEGMENT_INTERVALS,
        tuningConfig,
        Granularities.MONTH,
        Granularities.NONE,
        BatchIOConfig.DEFAULT_DROP_EXISTING
    );
  }

  @Test
  public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOException
  {
    final DimensionsSpec customSpec = new DimensionsSpec(
        Lists.newArrayList(
            new LongDimensionSchema("timestamp"),
            new StringDimensionSchema("string_dim_0"),
            new StringDimensionSchema("string_dim_1"),
            new StringDimensionSchema("string_dim_2"),
            new StringDimensionSchema("string_dim_3"),
            new StringDimensionSchema("string_dim_4"),
            new LongDimensionSchema("long_dim_0"),
            new LongDimensionSchema("long_dim_1"),
            new LongDimensionSchema("long_dim_2"),
            new LongDimensionSchema("long_dim_3"),
            new LongDimensionSchema("long_dim_4"),
            new FloatDimensionSchema("float_dim_0"),
            new FloatDimensionSchema("float_dim_1"),
            new FloatDimensionSchema("float_dim_2"),
            new FloatDimensionSchema("float_dim_3"),
            new FloatDimensionSchema("float_dim_4"),
            new DoubleDimensionSchema("double_dim_0"),
            new DoubleDimensionSchema("double_dim_1"),
            new DoubleDimensionSchema("double_dim_2"),
            new DoubleDimensionSchema("double_dim_3"),
            new DoubleDimensionSchema("double_dim_4"),
            new StringDimensionSchema(MIXED_TYPE_COLUMN)
        )
    );

    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        customSpec,
        null,
        null,
        null,
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );

    ingestionSpecs.sort(
        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
        )
    );
    Assert.assertEquals(6, ingestionSpecs.size());
    final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(6);
    IntStream.range(0, 6).forEach(i -> dimensionsSpecs.add(customSpec));
    assertIngestionSchema(
        ingestionSpecs,
        dimensionsSpecs,
        AGGREGATORS.stream().map(AggregatorFactory::getCombiningFactory).collect(Collectors.toList()),
        SEGMENT_INTERVALS,
        Granularities.MONTH,
        Granularities.NONE,
        BatchIOConfig.DEFAULT_DROP_EXISTING
    );
  }

  @Test
  public void testCreateIngestionSchemaWithCustomMetricsSpec() throws IOException
  {
    final AggregatorFactory[] customMetricsSpec = new AggregatorFactory[]{
        new CountAggregatorFactory("custom_count"),
        new LongSumAggregatorFactory("custom_long_sum", "agg_1"),
        new FloatMinAggregatorFactory("custom_float_min", "agg_3"),
        new DoubleMaxAggregatorFactory("custom_double_max", "agg_4")
    };

    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        customMetricsSpec,
        null,
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );

    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();

    ingestionSpecs.sort(
        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
        )
    );
    Assert.assertEquals(6, ingestionSpecs.size());
    assertIngestionSchema(
        ingestionSpecs,
        expectedDimensionsSpec,
        Arrays.asList(customMetricsSpec),
        SEGMENT_INTERVALS,
        Granularities.MONTH,
        Granularities.NONE,
        BatchIOConfig.DEFAULT_DROP_EXISTING
    );
  }

  @Test
  public void testCreateIngestionSchemaWithCustomSegments() throws IOException
  {
    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        null,
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );

    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();

    ingestionSpecs.sort(
        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
        )
    );
    Assert.assertEquals(6, ingestionSpecs.size());
    assertIngestionSchema(
        ingestionSpecs,
        expectedDimensionsSpec,
        AGGREGATORS.stream().map(AggregatorFactory::getCombiningFactory).collect(Collectors.toList()),
        SEGMENT_INTERVALS,
        Granularities.MONTH,
        Granularities.NONE,
        BatchIOConfig.DEFAULT_DROP_EXISTING
    );
  }

  @Test
  public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOException
  {
    expectedException.expect(CoreMatchers.instanceOf(IllegalStateException.class));
    expectedException.expectMessage(CoreMatchers.containsString("are different from the current used segments"));

    final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
    Collections.sort(segments);
    // Remove one segment in the middle
    segments.remove(segments.size() / 2);
    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
        null,
        null,
        null,
        null,
        null,
        METRIC_BUILDER,
        false
    );

    NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );
  }

  @Test
  public void testMissingMetadata() throws IOException
  {
    expectedException.expect(RuntimeException.class);
    expectedException.expectMessage(CoreMatchers.startsWith("Index metadata doesn't exist for segment"));

    final TestIndexIO indexIO = (TestIndexIO) toolbox.getIndexIO();
    indexIO.removeMetadata(Iterables.getFirst(indexIO.getQueryableIndexMap().keySet(), null));
    final List<DataSegment> segments = new ArrayList<>(SEGMENTS);
    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, SpecificSegmentsSpec.fromSegments(segments)),
        null,
        null,
        null,
        null,
        null,
        METRIC_BUILDER,
        false
    );

    NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );
  }

  @Test
  public void testEmptyInterval()
  {
    expectedException.expect(IllegalArgumentException.class);
    expectedException.expectMessage(CoreMatchers.containsString("must specify a nonempty interval"));

    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );

    @SuppressWarnings("unused")
    final CompactionTask task = builder
        .interval(Intervals.of("2000-01-01/2000-01-01"))
        .build();
  }

  @Test
  public void testSegmentGranularityAndNullQueryGranularity() throws IOException
  {
    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        new ClientCompactionTaskGranularitySpec(new PeriodGranularity(Period.months(3), null, null), null, null),
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );
    final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
        new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
    );

    ingestionSpecs.sort(
        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
        )
    );
    Assert.assertEquals(1, ingestionSpecs.size());
    assertIngestionSchema(
        ingestionSpecs,
        expectedDimensionsSpec,
        AGGREGATORS.stream().map(AggregatorFactory::getCombiningFactory).collect(Collectors.toList()),
        Collections.singletonList(COMPACTION_INTERVAL),
        new PeriodGranularity(Period.months(3), null, null),
        Granularities.NONE,
        BatchIOConfig.DEFAULT_DROP_EXISTING
    );
  }

  @Test
  public void testQueryGranularityAndNullSegmentGranularity() throws IOException
  {
    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        new ClientCompactionTaskGranularitySpec(null, new PeriodGranularity(Period.months(3), null, null), null),
        null,
        METRIC_BUILDER,
        false
    );
    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );
    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();

    ingestionSpecs.sort(
        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
        )
    );
    Assert.assertEquals(6, ingestionSpecs.size());
    assertIngestionSchema(
        ingestionSpecs,
        expectedDimensionsSpec,
        AGGREGATORS.stream().map(AggregatorFactory::getCombiningFactory).collect(Collectors.toList()),
        SEGMENT_INTERVALS,
        Granularities.MONTH,
        new PeriodGranularity(Period.months(3), null, null),
        BatchIOConfig.DEFAULT_DROP_EXISTING
    );
  }

  @Test
  public void testQueryGranularityAndSegmentGranularityNonNull() throws IOException
  {
    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        new ClientCompactionTaskGranularitySpec(
            new PeriodGranularity(Period.months(3), null, null),
            new PeriodGranularity(Period.months(3), null, null),
            null
        ),
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );


    final List<DimensionsSpec> expectedDimensionsSpec = ImmutableList.of(
        new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
    );

    ingestionSpecs.sort(
        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
        )
    );
    Assert.assertEquals(1, ingestionSpecs.size());
    assertIngestionSchema(
        ingestionSpecs,
        expectedDimensionsSpec,
        AGGREGATORS.stream().map(AggregatorFactory::getCombiningFactory).collect(Collectors.toList()),
        Collections.singletonList(COMPACTION_INTERVAL),
        new PeriodGranularity(Period.months(3), null, null),
        new PeriodGranularity(Period.months(3), null, null),
        BatchIOConfig.DEFAULT_DROP_EXISTING
    );
    emitter.verifyEmitted("compact/segmentAnalyzer/fetchAndProcessMillis", 1);
  }

  @Test
  public void testNullGranularitySpec() throws IOException
  {
    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        null,
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );


    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();

    ingestionSpecs.sort(
        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
        )
    );
    Assert.assertEquals(6, ingestionSpecs.size());
    assertIngestionSchema(
        ingestionSpecs,
        expectedDimensionsSpec,
        AGGREGATORS.stream().map(AggregatorFactory::getCombiningFactory).collect(Collectors.toList()),
        SEGMENT_INTERVALS,
        Granularities.MONTH,
        Granularities.NONE,
        BatchIOConfig.DEFAULT_DROP_EXISTING
    );
  }

  @Test
  public void testGranularitySpecWithNullQueryGranularityAndNullSegmentGranularity()
      throws IOException
  {
    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        new ClientCompactionTaskGranularitySpec(null, null, null),
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );


    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();

    ingestionSpecs.sort(
        (s1, s2) -> Comparators.intervalsByStartThenEnd().compare(
            s1.getDataSchema().getGranularitySpec().inputIntervals().get(0),
            s2.getDataSchema().getGranularitySpec().inputIntervals().get(0)
        )
    );
    Assert.assertEquals(6, ingestionSpecs.size());
    assertIngestionSchema(
        ingestionSpecs,
        expectedDimensionsSpec,
        AGGREGATORS.stream().map(AggregatorFactory::getCombiningFactory).collect(Collectors.toList()),
        SEGMENT_INTERVALS,
        Granularities.MONTH,
        Granularities.NONE,
        BatchIOConfig.DEFAULT_DROP_EXISTING
    );
  }

  @Test
  public void testGranularitySpecWithNotNullRollup()
      throws IOException
  {
    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        new ClientCompactionTaskGranularitySpec(null, null, true),
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );

    Assert.assertEquals(6, ingestionSpecs.size());
    for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) {
      Assert.assertTrue(indexIngestionSpec.getDataSchema().getGranularitySpec().isRollup());
    }
  }

  @Test
  public void testGranularitySpecWithNullRollup()
      throws IOException
  {
    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        new ClientCompactionTaskGranularitySpec(null, null, null),
        null,
        METRIC_BUILDER,
        false
    );

    final List<ParallelIndexIngestionSpec> ingestionSpecs = NativeCompactionRunner.createIngestionSpecs(
        dataSchemasForIntervals,
        toolbox,
        new CompactionIOConfig(null, false, null),
        new PartitionConfigurationManager(TUNING_CONFIG),
        COORDINATOR_CLIENT,
        segmentCacheManagerFactory
    );


    Assert.assertEquals(6, ingestionSpecs.size());
    for (ParallelIndexIngestionSpec indexIngestionSpec : ingestionSpecs) {
      //Expect false since rollup value in metadata of existing segments are null
      Assert.assertFalse(indexIngestionSpec.getDataSchema().getGranularitySpec().isRollup());
    }
  }

  @Test
  public void testMultiValuedDimensionsProcessing()
      throws IOException
  {
    final Map<Interval, DataSchema> dataSchemasForIntervals = CompactionTask.createDataSchemasForIntervals(
        toolbox,
        LockGranularity.TIME_CHUNK,
        new SegmentProvider(DATA_SOURCE, new CompactionIntervalSpec(COMPACTION_INTERVAL, null)),
        null,
        null,
        null,
        new ClientCompactionTaskGranularitySpec(null, null, null),
        null,
        METRIC_BUILDER,
        true
    );
    for (DataSchema dataSchema : dataSchemasForIntervals.values()) {
      Assert.assertTrue(dataSchema instanceof CombinedDataSchema);
      Assert.assertTrue(((CombinedDataSchema) dataSchema).getMultiValuedDimensions().isEmpty());
    }
  }

  @Test
  public void testMSQRollupWithNoDimensionsSpecNeedsMVDInfo()
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
    builder.compactionRunner(new TestMSQCompactionRunner());
    final CompactionTask compactionTask = builder.build();
    // granularitySpec=null should assume a possible rollup
    Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
  }

  @Test
  public void testMSQRangePartitionWithNoDimensionsSpecNeedsMVDInfo()
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
    builder.compactionRunner(new TestMSQCompactionRunner());
    builder.tuningConfig(TuningConfigBuilder.forCompactionTask()
                                            .withForceGuaranteedRollup(true)
                                            .withPartitionsSpec(
                                                new DimensionRangePartitionsSpec(
                                                    3,
                                                    null,
                                                    ImmutableList.of(
                                                        "string_dim_1"),
                                                    false
                                                ))
                                            .build());
    final CompactionTask compactionTask = builder.build();
    Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
  }

  @Test
  public void testMSQRollupOnStringNeedsMVDInfo()
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
    builder.compactionRunner(new TestMSQCompactionRunner());
    builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, null, true));

    DimensionSchema stringDim = new StringDimensionSchema("string_dim_1", null, null);
    builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
    final CompactionTask compactionTask = builder.build();
    // A string dimension with rollup=true should need MVD info
    Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
  }

  @Test
  public void testMSQRangePartitionOnStringNeedsMVDInfo()
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
    builder.compactionRunner(new TestMSQCompactionRunner());

    DimensionSchema stringDim = new StringDimensionSchema("string_dim_1", null, null);
    builder.tuningConfig(TuningConfigBuilder.forCompactionTask()
                                            .withForceGuaranteedRollup(true)
                                            .withPartitionsSpec(
                                                new DimensionRangePartitionsSpec(
                                                    3,
                                                    null,
                                                    ImmutableList.of(
                                                        stringDim.getName()),
                                                    false
                                                ))
                                            .build());
    builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
    CompactionTask compactionTask = builder.build();
    Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
  }

  @Test
  public void testMSQRangePartitionOnAutoStringDoesNotNeedMVDInfo()
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, SegmentUtils.hashIds(SEGMENTS)));
    builder.compactionRunner(new TestMSQCompactionRunner());

    DimensionSchema stringDim = new AutoTypeColumnSchema("string_dim_1", ColumnType.STRING, null);
    builder.tuningConfig(TuningConfigBuilder.forCompactionTask()
                                            .withForceGuaranteedRollup(true)
                                            .withPartitionsSpec(
                                                new DimensionRangePartitionsSpec(
                                                    3,
                                                    null,
                                                    List.of(stringDim.getName()),
                                                    false
                                                )
                                            )
                                            .build());
    builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
    CompactionTask compactionTask = builder.build();
    Assert.assertFalse(compactionTask.identifyMultiValuedDimensions());
  }

  @Test
  public void testChooseFinestGranularityWithNulls()
  {
    List<Granularity> input = Arrays.asList(
        Granularities.DAY,
        Granularities.SECOND,
        Granularities.MINUTE,
        Granularities.SIX_HOUR,
        Granularities.EIGHT_HOUR,
        Granularities.DAY,
        null,
        Granularities.ALL,
        Granularities.MINUTE
    );
    Assert.assertEquals(Granularities.SECOND, chooseFinestGranularityHelper(input));
  }

  @Test
  public void testChooseFinestGranularityNone()
  {
    List<Granularity> input = ImmutableList.of(
        Granularities.DAY,
        Granularities.SECOND,
        Granularities.MINUTE,
        Granularities.SIX_HOUR,
        Granularities.EIGHT_HOUR,
        Granularities.NONE,
        Granularities.DAY,
        Granularities.NONE,
        Granularities.MINUTE
    );
    Assert.assertEquals(Granularities.NONE, chooseFinestGranularityHelper(input));
  }

  @Test
  public void testChooseFinestGranularityAllNulls()
  {
    List<Granularity> input = Arrays.asList(
        null,
        null,
        null,
        null
    );
    Assert.assertNull(chooseFinestGranularityHelper(input));
  }

  @Test
  public void testGetDefaultLookupLoadingSpec()
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    final CompactionTask task = builder
        .interval(Intervals.of("2000-01-01/2000-01-02"))
        .build();
    Assert.assertEquals(LookupLoadingSpec.NONE, task.getLookupLoadingSpec());
  }

  @Test
  public void testGetDefaultLookupLoadingSpecWithTransformSpec()
  {
    final Builder builder = new Builder(
        DATA_SOURCE,
        segmentCacheManagerFactory
    );
    final CompactionTask task = builder
        .interval(Intervals.of("2000-01-01/2000-01-02"))
        .transformSpec(new CompactionTransformSpec(new SelectorDimFilter("dim1", "foo", null)))
        .build();
    Assert.assertEquals(LookupLoadingSpec.ALL, task.getLookupLoadingSpec());
  }

  private Granularity chooseFinestGranularityHelper(List<Granularity> granularities)
  {
    SettableSupplier<Granularity> queryGranularity = new SettableSupplier<>();
    for (Granularity current : granularities) {
      queryGranularity.set(CompactionTask.ExistingSegmentAnalyzer.compareWithCurrent(queryGranularity.get(), current));
    }
    return queryGranularity.get();
  }


  private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration()
  {
    return ImmutableList.of(
        new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))),
        new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))),
        new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))),
        new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double", DimensionSchema.MultiValueHandling.ARRAY, null))),
        new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double"))),
        new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
    );
  }

  private static List<DimensionSchema> getDimensionSchema(DimensionSchema mixedTypeColumn)
  {
    return Lists.newArrayList(
        new LongDimensionSchema("timestamp"),
        new StringDimensionSchema("string_dim_4", DimensionSchema.MultiValueHandling.ARRAY, null),
        new LongDimensionSchema("long_dim_4"),
        new FloatDimensionSchema("float_dim_4"),
        new DoubleDimensionSchema("double_dim_4"),
        new StringDimensionSchema("string_dim_0", DimensionSchema.MultiValueHandling.ARRAY, null),
        new LongDimensionSchema("long_dim_0"),
        new FloatDimensionSchema("float_dim_0"),
        new DoubleDimensionSchema("double_dim_0"),
        new StringDimensionSchema("string_dim_1", DimensionSchema.MultiValueHandling.ARRAY, null),
        new LongDimensionSchema("long_dim_1"),
        new FloatDimensionSchema("float_dim_1"),
        new DoubleDimensionSchema("double_dim_1"),
        new StringDimensionSchema("string_dim_2", DimensionSchema.MultiValueHandling.ARRAY, null),
        new LongDimensionSchema("long_dim_2"),
        new FloatDimensionSchema("float_dim_2"),
        new DoubleDimensionSchema("double_dim_2"),
        new StringDimensionSchema("string_dim_3", DimensionSchema.MultiValueHandling.ARRAY, null),
        new LongDimensionSchema("long_dim_3"),
        new FloatDimensionSchema("float_dim_3"),
        new DoubleDimensionSchema("double_dim_3"),
        new StringDimensionSchema("string_dim_5", DimensionSchema.MultiValueHandling.ARRAY, null),
        new LongDimensionSchema("long_dim_5"),
        new FloatDimensionSchema("float_dim_5"),
        new DoubleDimensionSchema("double_dim_5"),
        mixedTypeColumn
    );
  }

  private void assertIngestionSchema(
      List<ParallelIndexIngestionSpec> ingestionSchemas,
      List<DimensionsSpec> expectedDimensionsSpecs,
      List<AggregatorFactory> expectedMetricsSpec,
      List<Interval> expectedSegmentIntervals,
      Granularity expectedSegmentGranularity,
      Granularity expectedQueryGranularity,
      boolean expectedDropExisting
  )
  {
    assertIngestionSchema(
        ingestionSchemas,
        expectedDimensionsSpecs,
        expectedMetricsSpec,
        expectedSegmentIntervals,
        TuningConfigBuilder
            .forCompactionTask()
            .withMaxRowsInMemory(500000)
            .withMaxBytesInMemory(1000000L)
            .withMaxTotalRows(Long.MAX_VALUE)
            .withPartitionsSpec(new HashedPartitionsSpec(5000000, null, null))
            .withIndexSpec(
                IndexSpec.builder()
                         .withBitmapSerdeFactory(RoaringBitmapSerdeFactory.getInstance())
                         .withDimensionCompression(CompressionStrategy.LZ4)
                         .withMetricCompression(CompressionStrategy.LZF)
                         .withLongEncoding(LongEncodingStrategy.LONGS)
                         .build()
            )
            .withForceGuaranteedRollup(true)
            .withReportParseExceptions(false)
            .withPushTimeout(5000L)
            .build(),
        expectedSegmentGranularity,
        expectedQueryGranularity,
        expectedDropExisting
    );
  }

  private void assertIngestionSchema(
      List<ParallelIndexIngestionSpec> ingestionSchemas,
      List<DimensionsSpec> expectedDimensionsSpecs,
      List<AggregatorFactory> expectedMetricsSpec,
      List<Interval> expectedSegmentIntervals,
      CompactionTask.CompactionTuningConfig expectedTuningConfig,
      Granularity expectedSegmentGranularity,
      Granularity expectedQueryGranularity,
      boolean expectedDropExisting
  )
  {
    Preconditions.checkArgument(
        ingestionSchemas.size() == expectedDimensionsSpecs.size(),
        "ingesionSchemas.size()[%s] should be same with expectedDimensionsSpecs.size()[%s]",
        ingestionSchemas.size(),
        expectedDimensionsSpecs.size()
    );

    for (int i = 0; i < ingestionSchemas.size(); i++) {
      final ParallelIndexIngestionSpec ingestionSchema = ingestionSchemas.get(i);
      final DimensionsSpec expectedDimensionsSpec = expectedDimensionsSpecs.get(i);

      // assert dataSchema
      final DataSchema dataSchema = ingestionSchema.getDataSchema();
      Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());

      Assert.assertEquals(
          new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
          dataSchema.getTimestampSpec()
      );

      Assert.assertEquals(
          new HashSet<>(expectedDimensionsSpec.getDimensions()),
          new HashSet<>(dataSchema.getDimensionsSpec().getDimensions())
      );

      // metrics
      Assert.assertEquals(expectedMetricsSpec, Arrays.asList(dataSchema.getAggregators()));
      Assert.assertEquals(
          new UniformGranularitySpec(
              expectedSegmentGranularity,
              expectedQueryGranularity,
              false,
              Collections.singletonList(expectedSegmentIntervals.get(i))
          ),
          dataSchema.getGranularitySpec()
      );

      // assert ioConfig
      final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig();
      Assert.assertFalse(ioConfig.isAppendToExisting());
      Assert.assertEquals(
          expectedDropExisting,
          ioConfig.isDropExisting()
      );
      final InputSource inputSource = ioConfig.getInputSource();
      Assert.assertTrue(inputSource instanceof DruidInputSource);
      final DruidInputSource druidInputSource = (DruidInputSource) inputSource;
      Assert.assertEquals(DATA_SOURCE, druidInputSource.getDataSource());
      Assert.assertEquals(expectedSegmentIntervals.get(i), druidInputSource.getInterval());
      Assert.assertNull(druidInputSource.getDimFilter());

      // assert tuningConfig
      Assert.assertEquals(expectedTuningConfig, ingestionSchema.getTuningConfig());
    }
  }

  private static class TestCoordinatorClient extends NoopCoordinatorClient
  {
    private final Map<DataSegment, File> segmentMap;

    TestCoordinatorClient(Map<DataSegment, File> segmentMap)
    {
      this.segmentMap = segmentMap;
    }

    @Override
    public ListenableFuture<List<DataSegment>> fetchUsedSegments(
        String dataSource,
        List<Interval> intervals
    )
    {
      return Futures.immediateFuture(ImmutableList.copyOf(segmentMap.keySet()));
    }
  }

  private TaskToolbox makeTaskToolbox(
      TaskActionClient taskActionClient,
      TestIndexIO indexIO,
      Map<DataSegment, File> segments
  )
  {
    final SegmentCacheManager segmentCacheManager = new NoopSegmentCacheManager()
    {
      @Override
      public void load(DataSegment segment)
      {
        // do nothing
      }

      @Override
      public Optional<Segment> acquireCachedSegment(DataSegment dataSegment)
      {
        return Optional.of(
            new QueryableIndexSegment(indexIO.loadIndex(segments.get(dataSegment)), dataSegment.getId())
        );
      }

      @Override
      public void drop(DataSegment segment)
      {
        // Do nothing.
      }
    };

    final TaskConfig config = new TaskConfigBuilder()
        .build();
    return new TaskToolbox.Builder()
        .config(config)
        .taskActionClient(taskActionClient)
        .joinableFactory(NoopJoinableFactory.INSTANCE)
        .indexIO(indexIO)
        .indexMerger(new IndexMergerV9(
            OBJECT_MAPPER,
            indexIO,
            OffHeapMemorySegmentWriteOutMediumFactory.instance(),
            true
        ))
        .taskReportFileWriter(new NoopTestTaskReportFileWriter())
        .authorizerMapper(AuthTestUtils.TEST_AUTHORIZER_MAPPER)
        .chatHandlerProvider(new NoopChatHandlerProvider())
        .rowIngestionMetersFactory(TEST_UTILS.getRowIngestionMetersFactory())
        .appenderatorsManager(new TestAppenderatorsManager())
        .coordinatorClient(COORDINATOR_CLIENT)
        .segmentCacheManager(segmentCacheManager)
        .taskLogPusher(null)
        .attemptId("1")
        .emitter(emitter)
        .centralizedTableSchemaConfig(CentralizedDatasourceSchemaConfig.create())
        .build();
  }

  private static class TestTaskActionClient implements TaskActionClient
  {
    private final List<DataSegment> segments;

    TestTaskActionClient(List<DataSegment> segments)
    {
      this.segments = segments;
    }

    @SuppressWarnings("unchecked")
    @Override
    public <RetType> RetType submit(TaskAction<RetType> taskAction)
    {
      if (!(taskAction instanceof RetrieveUsedSegmentsAction)) {
        throw new ISE("action[%s] is not supported", taskAction);
      }
      return (RetType) segments;
    }
  }

  private static class TestIndexIO extends IndexIO
  {
    private final Map<File, QueryableIndex> queryableIndexMap;

    TestIndexIO(
        ObjectMapper mapper,
        Map<DataSegment, File> segmentFileMap
    )
    {
      super(mapper, ColumnConfig.DEFAULT);

      queryableIndexMap = Maps.newHashMapWithExpectedSize(segmentFileMap.size());
      for (Entry<DataSegment, File> entry : segmentFileMap.entrySet()) {
        final DataSegment segment = entry.getKey();
        final List<String> columnNames = new ArrayList<>(segment.getDimensions().size() + segment.getMetrics().size());
        columnNames.add(ColumnHolder.TIME_COLUMN_NAME);
        columnNames.addAll(segment.getDimensions());
        columnNames.addAll(segment.getMetrics());
        final Map<String, Supplier<BaseColumnHolder>> columnMap = Maps.newHashMapWithExpectedSize(columnNames.size());
        final List<AggregatorFactory> aggregatorFactories = new ArrayList<>(segment.getMetrics().size());

        for (String columnName : columnNames) {
          if (MIXED_TYPE_COLUMN.equals(columnName)) {
            BaseColumnHolder columnHolder = createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval()));
            columnMap.put(columnName, () -> columnHolder);
          } else if (DIMENSIONS.containsKey(columnName)) {
            BaseColumnHolder columnHolder = createColumn(DIMENSIONS.get(columnName));
            columnMap.put(columnName, () -> columnHolder);
          } else {
            final Optional<AggregatorFactory> maybeMetric = AGGREGATORS.stream()
                                                                       .filter(agg -> agg.getName().equals(columnName))
                                                                       .findAny();
            if (maybeMetric.isPresent()) {
              BaseColumnHolder columnHolder = createColumn(maybeMetric.get());
              columnMap.put(columnName, () -> columnHolder);
              aggregatorFactories.add(maybeMetric.get());
            }
          }
        }

        queryableIndexMap.put(
            entry.getValue(),
            new SimpleQueryableIndex(
                segment.getInterval(),
                new ListIndexed<>(segment.getDimensions()),
                null,
                columnMap,
                null
            )
            {
              @Override
              public Metadata getMetadata()
              {
                return new Metadata(
                    null,
                    aggregatorFactories.toArray(new AggregatorFactory[0]),
                    null,
                    null,
                    null,
                    null,
                    null
                );
              }
            }
        );
      }
    }

    @Override
    public QueryableIndex loadIndex(File file)
    {
      return queryableIndexMap.get(file);
    }

    void removeMetadata(File file)
    {
      final SimpleQueryableIndex index = (SimpleQueryableIndex) queryableIndexMap.get(file);
      if (index != null) {
        queryableIndexMap.put(
            file,
            new SimpleQueryableIndex(
                index.getDataInterval(),
                index.getAvailableDimensions(),
                index.getBitmapFactoryForDimensions(),
                index.getColumns(),
                index.getFileMapper()
            )
            {
              @Override
              public Metadata getMetadata()
              {
                return null;
              }
            });
      }
    }

    Map<File, QueryableIndex> getQueryableIndexMap()
    {
      return queryableIndexMap;
    }
  }

  private static BaseColumnHolder createColumn(DimensionSchema dimensionSchema)
  {
    return new TestColumn(dimensionSchema.getColumnType());
  }

  private static BaseColumnHolder createColumn(AggregatorFactory aggregatorFactory)
  {
    return new TestColumn(aggregatorFactory.getIntermediateType());
  }

  private static class TestColumn implements BaseColumnHolder
  {
    private final ColumnCapabilities columnCapabilities;

    TestColumn(ColumnType type)
    {
      columnCapabilities = new ColumnCapabilitiesImpl()
          .setType(type)
          .setDictionaryEncoded(type.is(ValueType.STRING)) // set a fake value to make string columns
          .setHasBitmapIndexes(type.is(ValueType.STRING))
          .setHasSpatialIndexes(false)
          .setHasMultipleValues(false);
    }

    @Override
    public ColumnCapabilities getCapabilities()
    {
      return columnCapabilities;
    }

    @Override
    public int getLength()
    {
      return NUM_ROWS_PER_SEGMENT;
    }

    @Override
    public BaseColumn getColumn()
    {
      return null;
    }

    @Nullable
    @Override
    public ColumnIndexSupplier getIndexSupplier()
    {
      return null;
    }

    @Override
    public SettableColumnValueSelector<?> makeNewSettableColumnValueSelector()
    {
      return null;
    }

  }

  /**
   * A class to mimic validations with MSQCompactionRunner behaviour, since the actual class resides in the MSQ extn.
   * Since validations just depend on the type of runner, all overrideen functions just return null.
   */
  private static class TestMSQCompactionRunner implements CompactionRunner
  {
    @Override
    public TaskStatus runCompactionTasks(
        CompactionTask compactionTask,
        Map<Interval, DataSchema> intervalDataSchemaMap,
        TaskToolbox taskToolbox
    )
    {
      return null;
    }

    @Override
    public CurrentSubTaskHolder getCurrentSubTaskHolder()
    {
      return null;
    }

    @Override
    public CompactionConfigValidationResult validateCompactionTask(
        CompactionTask compactionTask,
        Map<Interval, DataSchema> intervalToDataSchemaMap
    )
    {
      return null;
    }
  }

  /**
   * The compaction task spec in 0.16.0 except for the tuningConfig.
   * The original spec accepts only {@link IndexTuningConfig}, but this class acceps any type of tuningConfig for
   * testing.
   */
  private static class OldCompactionTaskWithAnyTuningConfigType extends AbstractTask
  {
    private final Interval interval;
    private final List<DataSegment> segments;
    @Nullable
    private final DimensionsSpec dimensionsSpec;
    @Nullable
    private final AggregatorFactory[] metricsSpec;
    @Nullable
    private final Granularity segmentGranularity;
    @Nullable
    private final Long targetCompactionSizeBytes;
    @Nullable
    private final TuningConfig tuningConfig;

    @JsonCreator
    public OldCompactionTaskWithAnyTuningConfigType(
        @JsonProperty("id") final String id,
        @JsonProperty("resource") final TaskResource taskResource,
        @JsonProperty("dataSource") final String dataSource,
        @JsonProperty("interval") @Nullable final Interval interval,
        @JsonProperty("segments") @Nullable final List<DataSegment> segments,
        @JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions,
        @JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
        @JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
        @JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity,
        @JsonProperty("targetCompactionSizeBytes") @Nullable final Long targetCompactionSizeBytes,
        @JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
        @JsonProperty("context") @Nullable final Map<String, Object> context,
        @JacksonInject ObjectMapper jsonMapper,
        @JacksonInject AuthorizerMapper authorizerMapper,
        @JacksonInject ChatHandlerProvider chatHandlerProvider,
        @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
        @JacksonInject CoordinatorClient coordinatorClient,
        @JacksonInject SegmentCacheManagerFactory segmentCacheManagerFactory,
        @JacksonInject AppenderatorsManager appenderatorsManager
    )
    {
      super(
          getOrMakeId(id, "compact", dataSource),
          null,
          taskResource,
          dataSource,
          context,
          IngestionMode.REPLACE_LEGACY
      );
      this.interval = interval;
      this.segments = segments;
      this.dimensionsSpec = dimensionsSpec;
      this.metricsSpec = metricsSpec;
      this.segmentGranularity = segmentGranularity;
      this.targetCompactionSizeBytes = targetCompactionSizeBytes;
      this.tuningConfig = tuningConfig;
    }

    @Override
    public String getType()
    {
      return "compact";
    }

    @Nonnull
    @JsonIgnore
    @Override
    public Set<ResourceAction> getInputSourceResources()
    {
      return ImmutableSet.of();
    }

    @JsonProperty
    public Interval getInterval()
    {
      return interval;
    }

    @JsonProperty
    public List<DataSegment> getSegments()
    {
      return segments;
    }

    @JsonProperty
    @Nullable
    public DimensionsSpec getDimensionsSpec()
    {
      return dimensionsSpec;
    }

    @JsonProperty
    @Nullable
    public AggregatorFactory[] getMetricsSpec()
    {
      return metricsSpec;
    }

    @JsonProperty
    @Nullable
    public Granularity getSegmentGranularity()
    {
      return segmentGranularity;
    }

    @Nullable
    @JsonProperty
    public Long getTargetCompactionSizeBytes()
    {
      return targetCompactionSizeBytes;
    }

    @Nullable
    @JsonProperty
    public TuningConfig getTuningConfig()
    {
      return tuningConfig;
    }

    @Override
    public boolean isReady(TaskActionClient taskActionClient)
    {
      throw new UnsupportedOperationException();
    }

    @Override
    public void stopGracefully(TaskConfig taskConfig)
    {
      throw new UnsupportedOperationException();
    }

    @Override
    public TaskStatus runTask(TaskToolbox toolbox)
    {
      throw new UnsupportedOperationException();
    }
  }
}
